package flinkdeploylogic

import (
	"context"
	"errors"
	"fmt"
	"github.com/zeromicro/go-zero/core/logx"
	"sync"
	"time"
	"yunzhan/common/models"
	utils "yunzhan/common/utils"
	"yunzhan/rpc-server/internal/svc"
	agent "yunzhan/rpc-server/pb"
)

type DeployFLinkYarnLogic struct {
	ctx    context.Context
	svcCtx *svc.ServiceContext
	logx.Logger
}

func NewDeployFLinkYarnLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DeployFLinkYarnLogic {
	return &DeployFLinkYarnLogic{
		ctx:    ctx,
		svcCtx: svcCtx,
		Logger: logx.WithContext(ctx),
	}
}

// DeployFLinkYarn yarn部署
func (l *DeployFLinkYarnLogic) DeployFLinkYarn(in *agent.DeployFlinkRequest) (*agent.DeployResponse, error) {
	if in.NodeInfo == nil {
		return &agent.DeployResponse{
			Code:    500,
			Message: "未配置节点信息, 无法执行Flink集群部署",
		}, errors.New("未配置节点信息, 无法执行Flink集群部署")
	}

	resp := &agent.DeployResponse{
		Code:    200,
		Message: fmt.Sprintf("请求已接收，正在部署 %s 到节点 %s", in.ConfigInfo.ComponentName, in.NodeInfo.Host),
	}

	go l.startDeployment(in)

	return resp, nil
}

func (l *DeployFLinkYarnLogic) startDeployment(in *agent.DeployFlinkRequest) {

	var wg sync.WaitGroup

	wg.Add(1)
	go func() {
		defer wg.Done()

		// 创建 LogManager 实例
		logManager := models.NewLogManager(100, fmt.Sprintf("http://%s:%d", l.svcCtx.Config.RestServices["apiServer"].RestConf.Host, l.svcCtx.Config.RestServices["apiServer"].RestConf.Port))
		defer logManager.Close()

		flushInterval := 5 * time.Second // 刷新间隔
		maxBatchSize := 100              // 最大批量大小
		retryDelay := 2 * time.Second    // 重试延迟
		maxRetries := 3                  // 最大重试次数

		// 创建 ComponentInfoManager 实例
		componentInfoManager := models.NewComponentInfoManager(
			1000,
			fmt.Sprintf("http://%s:%d", l.svcCtx.Config.RestServices["apiServer"].RestConf.Host, l.svcCtx.Config.RestServices["apiServer"].RestConf.Port),
			flushInterval,
			maxBatchSize,
			retryDelay,
			maxRetries)
		defer componentInfoManager.Close()

		packageDirectory := in.ConfigInfo.PackagePath
		rtrimDir := utils.Rtrim(packageDirectory, "/")
		flinkVersion := in.ConfigInfo.ComponentName
		installPath := utils.Rtrim(in.ConfigInfo.InstallPath, "/")
		baseInfo := fmt.Sprintf("安装包存放目录: %s, Flink version: %s, Flink 安装目录: %s", rtrimDir, flinkVersion, installPath)
		step := 1
		logManager.SendLog(in.ClusterID, in.NodeInfo.Host, step, "INFO", baseInfo, "Flink", "Server")

		checkPathCommand := fmt.Sprintf(`
if [ ! -d "%s" ]; then
   sudo mkdir -p "%s"
fi
`, installPath, installPath)

		step++
		logManager.SendLog(in.ClusterID, in.NodeInfo.Host, step, "INFO", "执行组件安装路径检查...", "Flink", "Server")
		output, err := utils.ExecCommand(checkPathCommand)
		if err != nil {
			message := fmt.Sprintf("检查组件安装路径异常: %v, output: %s", err, output)
			logManager.SendLog(in.ClusterID, in.NodeInfo.Host, step, "ERROR", message, "Flink", "Server")
			return
		}
		logManager.SendLog(in.ClusterID, in.NodeInfo.Host, step, "INFO", "检查组件安装路径完成", "Flink", "Server")
		step++
		logManager.SendLog(in.ClusterID, in.NodeInfo.Host, step, "INFO", "解压安装包...", "Flink", "Server")
		tarCommand := fmt.Sprintf("sudo tar -xzf %s/%s -C %s", rtrimDir, flinkVersion, installPath)
		output, err = utils.ExecCommand(tarCommand)
		if err != nil {
			message := fmt.Sprintf("解压安装包异常: %v, output: %s", err, output)
			logManager.SendLog(in.ClusterID, in.NodeInfo.Host, step, "ERROR", message, "Flink", "Server")
			return
		}

		logManager.SendLog(in.ClusterID, in.NodeInfo.Host, step, "INFO", "解压安装包完成", "Flink", "Server")

		flinkHome := utils.Rtrim(flinkVersion, ".tar.gz")
		home := fmt.Sprintf("%s/%s", installPath, flinkHome)

		step++
		logManager.SendLog(in.ClusterID, in.NodeInfo.Host, step, "INFO", "修改组件所属用户组...", "Flink", "Server")
		chownCommand := fmt.Sprintf("sudo chown -R %s:%s %s", in.NodeInfo.Username, in.NodeInfo.Username, home)
		output, err = utils.ExecCommand(chownCommand)
		if err != nil {
			message := fmt.Sprintf("修改组件所属用户组异常: %v, output: %s", err, output)
			logManager.SendLog(in.ClusterID, in.NodeInfo.Host, step, "ERROR", message, "Flink", "Server")
			return
		}
		logManager.SendLog(in.ClusterID, in.NodeInfo.Host, step, "INFO", "修改组件所属用户组完成", "Flink", "Server")

		step++
		logManager.SendLog(in.ClusterID, in.NodeInfo.Host, step, "INFO", "验证Flink客户端配置...", "Flink", "Server")
		flinkValidateCommand := fmt.Sprintf("sudo -u %s %s/bin/yarn-session.sh -n 1 -d", in.NodeInfo.Username, home)
		output, err = utils.ExecCommand(flinkValidateCommand)
		if err != nil {
			message := fmt.Sprintf("验证Flink客户端配置异常: %v, output: %s", err, output)
			logManager.SendLog(in.ClusterID, in.NodeInfo.Host, step, "ERROR", message, "Flink", "Server")
			return
		}
		logManager.SendLog(in.ClusterID, in.NodeInfo.Host, step, "INFO", "验证Flink客户端配置完成", "Flink", "Server")

		logManager.SendLog(in.ClusterID, in.NodeInfo.Host, step, "INFO", "Flink客户端部署完成", "Flink", "Server")

		currentIP, currentIPErr := utils.GetCurrentInternalIP()
		if err != nil {
			message := fmt.Sprintf("%s: 无法获取当前主机IP: %v", in.NodeInfo.Host, currentIPErr)
			logManager.SendLog(in.ClusterID, in.NodeInfo.Host, step, "ERROR", message, "Flink", "Server")
			return
		}
		currentHost, currentHostErr := utils.GetCurrentHostname()
		if err != nil {
			message := fmt.Sprintf("%s: 无法获取当前主机名: %v", in.NodeInfo.Host, currentHostErr)
			logManager.SendLog(in.ClusterID, in.NodeInfo.Host, step, "ERROR", message, "Flink", "Server")
			return
		}

		flinkInfo := &models.ComponentInfo{
			ClusterID:      in.ClusterID,
			ComponentName:  "Flink",
			Version:        in.Version,
			NodeHost:       currentHost,
			NodeIP:         currentIP,
			ComponentRole:  "Client",
			HomePath:       home,
			DataStorageDir: "",
			Status:         true,
			AdditionalInfo: fmt.Sprintf("config_file=%s/conf/flink-conf.yaml", home),
		}

		componentInfoManager.SendComponentInfo(flinkInfo)
		//err = utils.LogComponentInfo(l.svcCtx.Conn, flinkInfo)
		//if err != nil {
		//	logx.Errorf("%s 节点: 更新 %s %s 角色信息失败: %v", in.NodeInfo.Host, "Flink", "Client", err)
		//} else {
		//	logx.Infof("%s 节点: 更新 %s %s 角色信息成功", in.NodeInfo.Host, "Flink", "Client")
		//}

	}()
	wg.Wait()

}
